package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.TopicMessageContainer;

public class DurableTopicSubscription extends SubscriptionImpl
{
  private static final Log log = LogFactory.getLog(DurableTopicSubscription.class);
  private String persistentKey;

  public DurableTopicSubscription(Dispatcher dispatcher, ConsumerInfo info, Filter filter)
  {
    super(dispatcher, info, filter);
  }

  public synchronized void messageConsumed(MessageAck ack) throws JMSException {
    if ((!ack.isMessageRead()) && (!isBrowser())) {
      super.messageConsumed(ack);
    }
    else {
      Map lastMessagePointersPerContainer = new HashMap();

      boolean found = false;
      QueueListEntry queueEntry = this.messagePtrs.getFirstEntry();
      while (queueEntry != null) {
        MessagePointer pointer = (MessagePointer)queueEntry.getElement();

        this.messagePtrs.remove(queueEntry);
        lastMessagePointersPerContainer.put(pointer.getContainer(), pointer);
        this.unconsumedMessagesDispatched.decrement();

        if (pointer.getMessageIdentity().equals(ack.getMessageIdentity())) {
          found = true;
          break;
        }
        queueEntry = this.messagePtrs.getNextEntry(queueEntry);
      }
      if (!found) {
        log.warn("Did not find a matching message for identity: " + ack.getMessageIdentity());
      }

      for (Iterator iter = lastMessagePointersPerContainer.entrySet().iterator(); iter.hasNext(); ) {
        Map.Entry entry = (Map.Entry)iter.next();
        TopicMessageContainer container = (TopicMessageContainer)entry.getKey();
        MessagePointer pointer = (MessagePointer)entry.getValue();
        container.setLastAcknowledgedMessageID(this, pointer.getMessageIdentity());
      }

      this.dispatch.wakeup(this);
    }
  }

  public synchronized void redeliverMessage(MessageContainer container, MessageAck ack) throws JMSException
  {
    MessagePointer pointer = new MessagePointer(container, ack.getMessageIdentity());
    this.messagePtrs.add(pointer);
    this.unconsumedMessagesDispatched.increment();
  }

  public String getPersistentKey() {
    if (this.persistentKey == null) {
      this.persistentKey = ("[" + getClientId() + ":" + getSubscriberName() + "]");
    }
    return this.persistentKey;
  }
}